/**
 * FileName: MutiCols2ColProcessImpl
 * Author:   SAMSUNG-PC 孙中军
 * Date:     2019/3/20 11:17
 * Description:
 */
package cn.com.bonc.process.impl;

import cn.com.bonc.conf.ConfigurationManager;
import cn.com.bonc.constant.Constants;
import cn.com.bonc.process.Process;
import cn.com.bonc.util.ProcessCfgUtil;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

import static org.apache.spark.sql.functions.concat_ws;

public class MutiCols2ColProcessImpl implements Process {

    @Override
    public Dataset<Row> processing(Dataset<Row> rowDataset) {
        //需要输出的列,多列合并为value列
        String sep= ConfigurationManager.getProperty(Constants.DEFAULT_DATA_SEPARATOR);
        if (ProcessCfgUtil.getSinkCols()!=null){
            return rowDataset.select(concat_ws(sep, ProcessCfgUtil.getSinkCols()).cast("string").as("value"));
        }

        return rowDataset.select(concat_ws(sep, ProcessCfgUtil.getCols(rowDataset.columns())).cast("string").as("value"));
    }
}
